<%
    from codegen.utilities.function_helpers import order_function_parameters_by_optional
    from codegen.utilities.interpreter_helpers import (
        get_c_function_call_template,
        get_instantiation_lines_for_output,
        get_interpreter_functions,
        get_interpreter_parameter_signature,
        get_params_for_function_signature,
        get_return_values,
        is_event_register_function,
        LIBRARY_INTERPRETER_IGNORED_FUNCTIONS,
        INCLUDE_SIZE_HINT_FUNCTIONS,
    )
    from codegen.utilities.text_wrappers import wrap, docstring_wrap

    functions = get_interpreter_functions(data)
%>\
# Do not edit this file; it was automatically generated.

from __future__ import annotations
import ctypes
import logging
import numpy
import platform
import warnings
from enum import Enum
from datetime import timezone
from hightime import datetime as ht_datetime
from hightime import timedelta as ht_timedelta
from typing import Any, TYPE_CHECKING

from collections.abc import Callable, Sequence

from nidaqmx._base_interpreter import BaseEventHandler, BaseInterpreter
from nidaqmx._lib import lib_importer, ctypes_byte_str, c_bool32, wrapped_ndpointer, TaskHandle
from nidaqmx.constants import FillMode, WaveformAttributeMode
from nidaqmx.error_codes import DAQmxErrors, DAQmxWarnings
from nidaqmx.errors import DaqError, DaqFunctionNotSupportedError, DaqReadError, DaqWarning, DaqWriteError
from nidaqmx.types import DriverVersion
from nidaqmx._lib_time import AbsoluteTime
from nidaqmx._waveform_utils import get_num_samps_per_chan
from nitypes.waveform.typing import ExtendedPropertyValue
from nitypes.waveform import AnalogWaveform, DigitalWaveform, SampleIntervalMode, Timing, ExtendedPropertyDictionary

if TYPE_CHECKING:
    from typing_extensions import TypeAlias

_logger = logging.getLogger(__name__)
_was_runtime_environment_set = None

_INT64_WFM_SEC_PER_TICK = 100e-9
_T0_EPOCH = ht_datetime(1, 1, 1, tzinfo=timezone.utc)

# typedef int32 (CVICALLBACK *DAQmxSetWfmAttrCallbackPtr)(uInt32 channelIndex, const char attributeName[], int32 attributeType, const void* value, uInt32 valueSizeInBytes, void *callbackData);  # noqa: W505 - doc line too long
CSetWfmAttrCallbackPtr = ctypes.CFUNCTYPE(
    ctypes.c_int32,  # return value (error code)
    ctypes.c_uint32,  # channel_index
    ctypes.c_char_p,  # attribute_name
    ctypes.c_int32,  # attribute_type
    ctypes.c_void_p,  # value
    ctypes.c_uint32,  # value_size_in_bytes
    ctypes.c_void_p,  # callback_data
)

class WfmAttrType(Enum):
    BOOL32 = 1
    FLOAT64 = 2
    INT32 = 3
    STRING = 4

SetWfmAttrCallback: TypeAlias = Callable[[int, str, WfmAttrType, ExtendedPropertyValue, object], int]

class LibraryEventHandler(BaseEventHandler):
    """Manage the lifetime of a ctypes callback method pointer.

    If DAQmx invokes a callback method pointer that has been garbage collected, the Python
    interpreter will crash.
    """
    __slots__ = ["_callback_method_ptr"]

    def __init__(self, callback_method_ptr: object) -> None:
        self._callback_method_ptr = callback_method_ptr

    def close(self) -> None:
        self._callback_method_ptr = None


class LibraryInterpreter(BaseInterpreter):
    """
    Library C<->Python interpreter.
    This class is responsible for interpreting the Library's C API.

    """
    # Do not add per-task state to the interpreter class.
    __slots__ = ('_driver_version',)

    def __init__(self):
        global _was_runtime_environment_set
        if _was_runtime_environment_set is None:
            try:
                runtime_env = platform.python_implementation()
                version = platform.python_version()
                self.set_runtime_environment(
                    runtime_env,
                    version,
                    '',
                    ''
                )
            except DaqFunctionNotSupportedError:
                pass
            finally:
                _was_runtime_environment_set = True

        major_version = self.get_system_info_attribute_uint32(0x1272)
        minor_version = self.get_system_info_attribute_uint32(0x1923)
        update_version = self.get_system_info_attribute_uint32(0x2f22)
        self._driver_version = DriverVersion(major_version, minor_version, update_version)

    @property
    def driver_version(self):
        return self._driver_version


% for func in functions:
<%
    if func.function_name in LIBRARY_INTERPRETER_IGNORED_FUNCTIONS:
        continue
    params = get_params_for_function_signature(func)
    sorted_params = order_function_parameters_by_optional(params)
    parameter_signature = get_interpreter_parameter_signature(is_python_factory, sorted_params)
    if func.function_name in INCLUDE_SIZE_HINT_FUNCTIONS:
        parameter_signature = ", ".join([parameter_signature, "size_hint=0"])
    return_values = get_return_values(func)
%>\
    %if (len(func.function_name) + len(parameter_signature)) > 68:
    def ${func.function_name}(
            ${parameter_signature + '):' | wrap(12, 12)}
    %else:
    def ${func.function_name}(${parameter_signature}):
    %endif
\
## Script instantiation for output parameters that will be passed by reference.
<%
    instantiation_lines = get_instantiation_lines_for_output(func)
    %>\
\
%if func.is_init_method and func.is_python_codegen_method:
        new_session_initialized = True
%endif
%if func.is_python_codegen_method:
    %if len(instantiation_lines) > 0:
        %for instantiation_line in instantiation_lines:
        ${instantiation_line}
        %endfor

    %endif
\
<%include file="${'/library_interpreter' + get_c_function_call_template(func)}" args="function=func" />\
    %if len(list(return_values)) != 0:
        return ${', '.join(return_values)}
    %endif
%else:
        raise NotImplementedError
%endif

%endfor
    ## get_error_string has special error handling.
    def get_error_string(self, error_code):
        error_buffer = ctypes.create_string_buffer(2048)

        cfunc = lib_importer.windll.DAQmxGetErrorString
        if cfunc.argtypes is None:
            with cfunc.arglock:
                if cfunc.argtypes is None:
                    cfunc.argtypes = [ctypes.c_int, ctypes.c_char_p,
                                      ctypes.c_uint]

        query_error_code = cfunc(error_code, error_buffer, 2048)
        if query_error_code < 0:
            _logger.error('Failed to get error string for error code %d. DAQmxGetErrorString returned error code %d.', error_code, query_error_code)
            return 'Failed to retrieve error description.'
        return error_buffer.value.decode(lib_importer.encoding)

    ## get_extended_error_info has special error handling and it is library-only because it uses
    ## thread-local storage.
    def get_extended_error_info(self):
        error_buffer = ctypes.create_string_buffer(2048)

        cfunc = lib_importer.windll.DAQmxGetExtendedErrorInfo
        if cfunc.argtypes is None:
            with cfunc.arglock:
                if cfunc.argtypes is None:
                    cfunc.argtypes = [ctypes.c_char_p, ctypes.c_uint]

        query_error_code = cfunc(error_buffer, 2048)
        if query_error_code < 0:
            _logger.error('Failed to get extended error info. DAQmxGetExtendedErrorInfo returned error code %d.', query_error_code)
            return 'Failed to retrieve error description.'
        return error_buffer.value.decode(lib_importer.encoding)

    ## read_analog_waveform has special handling for waveform attributes and callbacks
    def read_analog_waveform(
        self,
        task_handle: object,
        number_of_samples_per_channel: int,
        timeout: float,
        waveform: AnalogWaveform[numpy.float64],
        waveform_attribute_mode: WaveformAttributeMode
    ) -> int:
        """Read an analog waveform with timing and attributes."""
        if WaveformAttributeMode.EXTENDED_PROPERTIES in waveform_attribute_mode:
            properties = [waveform.extended_properties]
        else:
            properties = None

        if WaveformAttributeMode.TIMING in waveform_attribute_mode:
            t0_array = numpy.zeros(1, dtype=numpy.int64)
            dt_array = numpy.zeros(1, dtype=numpy.int64)
        else:
            t0_array = None
            dt_array = None

        waveform.sample_count = number_of_samples_per_channel

        error_code, samples_read = self._internal_read_analog_waveform_ex(
            task_handle,
            number_of_samples_per_channel,
            timeout,
            FillMode.GROUP_BY_CHANNEL.value,
            waveform.raw_data,
            properties,
            t0_array,
            dt_array,
        )

        waveform.sample_count = samples_read

        if t0_array is not None and dt_array is not None:
            self._set_waveform_timings([waveform], t0_array, dt_array)

        self.check_for_error(error_code, samps_per_chan_read=samples_read)
        return samples_read

    ## read_analog_waveforms has special handling for waveform attributes and callbacks
    def read_analog_waveforms(
        self,
        task_handle: object,
        number_of_samples_per_channel: int,
        timeout: float,
        waveforms: Sequence[AnalogWaveform[numpy.float64]],
        waveform_attribute_mode: WaveformAttributeMode
    ) -> int:
        """Read a set of analog waveforms with timing and attributes. All of the waveforms must be the same size."""
        if WaveformAttributeMode.EXTENDED_PROPERTIES in waveform_attribute_mode:
            properties = [waveform.extended_properties for waveform in waveforms]
        else:
            properties = None

        if WaveformAttributeMode.TIMING in waveform_attribute_mode:
            t0_array = numpy.zeros(len(waveforms), dtype=numpy.int64)
            dt_array = numpy.zeros(len(waveforms), dtype=numpy.int64)
        else:
            t0_array = None
            dt_array = None

        for waveform in waveforms:
            waveform.sample_count = number_of_samples_per_channel

        error_code, samples_read = self._internal_read_analog_waveform_per_chan(
            task_handle,
            number_of_samples_per_channel,
            timeout,
            [waveform.raw_data for waveform in waveforms],
            properties,
            t0_array,
            dt_array,
        )

        for waveform in waveforms:
            waveform.sample_count = samples_read
            
        if t0_array is not None and dt_array is not None:
            self._set_waveform_timings(waveforms, t0_array, dt_array)

        self.check_for_error(error_code, samps_per_chan_read=samples_read)
        return samples_read

    def _internal_read_analog_waveform_ex(
        self,
        task_handle: object,
        number_of_samples_per_channel: int,
        timeout: float,
        fill_mode: int,
        read_array: numpy.typing.NDArray[numpy.float64],
        properties: Sequence[ExtendedPropertyDictionary] | None,
        t0_array: numpy.typing.NDArray[numpy.int64] | None,
        dt_array: numpy.typing.NDArray[numpy.int64] | None,
    ) -> tuple[
        int, # error code
        int, # The number of samples per channel that were read
    ]:
        assert isinstance(task_handle, TaskHandle)
        samps_per_chan_read = ctypes.c_int()

        cfunc = lib_importer.windll.DAQmxInternalReadAnalogWaveformEx
        if cfunc.argtypes is None:
            with cfunc.arglock:
                if cfunc.argtypes is None:
                    cfunc.argtypes = [
                        TaskHandle,
                        ctypes.c_int,
                        ctypes.c_double,
                        ctypes.c_int,
                        wrapped_ndpointer(dtype=numpy.int64, flags=("C", "W")),
                        wrapped_ndpointer(dtype=numpy.int64, flags=("C", "W")),
                        ctypes.c_uint,
                        CSetWfmAttrCallbackPtr,
                        ctypes.c_void_p,
                        wrapped_ndpointer(dtype=numpy.float64, flags=("C", "W")),
                        ctypes.c_uint,
                        ctypes.POINTER(ctypes.c_int),
                        ctypes.POINTER(c_bool32),
                    ]

        error_code = cfunc(
            task_handle,
            number_of_samples_per_channel,
            timeout,
            fill_mode,
            t0_array,
            dt_array,
            0 if t0_array is None else t0_array.size,
            self._get_wfm_attr_callback(properties),
            None,
            read_array,
            read_array.size,
            ctypes.byref(samps_per_chan_read),
            None,
        )

        return error_code, samps_per_chan_read.value

    def _internal_read_analog_waveform_per_chan(
        self,
        task_handle: object,
        num_samps_per_chan: int,
        timeout: float,
        read_arrays: Sequence[numpy.typing.NDArray[numpy.float64]],
        properties: Sequence[ExtendedPropertyDictionary] | None,
        t0_array: numpy.typing.NDArray[numpy.int64] | None,
        dt_array: numpy.typing.NDArray[numpy.int64] | None,
    ) -> tuple[
        int, # error code
        int, # The number of samples per channel that were read
    ]:
        assert isinstance(task_handle, TaskHandle)
        samps_per_chan_read = ctypes.c_int()

        channel_count = len(read_arrays)
        assert channel_count > 0
        array_size = read_arrays[0].size
        assert all(read_array.size == array_size for read_array in read_arrays)

        cfunc = lib_importer.windll.DAQmxInternalReadAnalogWaveformPerChan
        if cfunc.argtypes is None:
            with cfunc.arglock:
                if cfunc.argtypes is None:
                    cfunc.argtypes = [
                        TaskHandle,
                        ctypes.c_int,
                        ctypes.c_double,
                        wrapped_ndpointer(dtype=numpy.int64, flags=("C", "W")),
                        wrapped_ndpointer(dtype=numpy.int64, flags=("C", "W")),
                        ctypes.c_uint,
                        CSetWfmAttrCallbackPtr,
                        ctypes.c_void_p,
                        ctypes.POINTER(ctypes.POINTER(ctypes.c_double)),
                        ctypes.c_uint,
                        ctypes.c_uint,
                        ctypes.POINTER(ctypes.c_int),
                        ctypes.POINTER(c_bool32),
                    ]

        read_array_ptrs = (ctypes.POINTER(ctypes.c_double) * channel_count)()
        for i, read_array in enumerate(read_arrays):
            read_array_ptrs[i] = read_array.ctypes.data_as(ctypes.POINTER(ctypes.c_double))

        error_code = cfunc(
            task_handle,
            num_samps_per_chan,
            timeout,
            t0_array,
            dt_array,
            0 if t0_array is None else t0_array.size,
            self._get_wfm_attr_callback(properties),
            None,
            read_array_ptrs,
            channel_count,
            array_size,
            ctypes.byref(samps_per_chan_read),
            None,
        )
        self.check_for_error(error_code, samps_per_chan_read=samps_per_chan_read.value)

        return error_code, samps_per_chan_read.value

    def _get_wfm_attr_callback(self, properties):
        if properties is not None:
            def set_wfm_attr_callback(
                channel_index: int,
                attribute_name: str,
                attribute_type: WfmAttrType,
                value: ExtendedPropertyValue,
                callback_data: object,
            ) -> int:
                properties[channel_index][attribute_name] = value
                return 0
            return self._get_wfm_attr_callback_ptr(set_wfm_attr_callback)
        else:
            return CSetWfmAttrCallbackPtr()

    def _get_wfm_attr_value(
        self, attribute_type: int, value: ctypes.c_void_p, value_size_in_bytes: int
    ) -> ExtendedPropertyValue:
        if attribute_type == WfmAttrType.BOOL32.value:
            assert value_size_in_bytes == 4
            return ctypes.cast(value, ctypes.POINTER(ctypes.c_int32))[0] != 0
        elif attribute_type == WfmAttrType.FLOAT64.value:
            assert value_size_in_bytes == 8
            return float(ctypes.cast(value, ctypes.POINTER(ctypes.c_double))[0])
        elif attribute_type == WfmAttrType.INT32.value:
            assert value_size_in_bytes == 4
            return int(ctypes.cast(value, ctypes.POINTER(ctypes.c_int32))[0])
        elif attribute_type == WfmAttrType.STRING.value:
            value_c_bytes = ctypes.cast(value, ctypes.POINTER(ctypes.c_byte))
            assert value_c_bytes[value_size_in_bytes - 1] == 0
            return bytes(value_c_bytes[0 : value_size_in_bytes - 1]).decode(lib_importer.encoding)
        else:
            raise ValueError(f"Unsupported attribute type {attribute_type}")

    def _get_wfm_attr_callback_ptr(
        self, set_wfm_attr_callback: SetWfmAttrCallback
    ) -> ctypes._FuncPointer:
        def _invoke_callback(
            channel_index: int,
            attribute_name: bytes,
            attribute_type: int,
            value: ctypes.c_void_p,
            value_size_in_bytes: int,
            callback_data: object,
        ) -> int:
            try:
                return set_wfm_attr_callback(
                    channel_index,
                    attribute_name.decode(lib_importer.encoding),
                    WfmAttrType(attribute_type),
                    self._get_wfm_attr_value(attribute_type, value, value_size_in_bytes),
                    callback_data,
                )
            except Exception:
                _logger.exception("Unhandled exception in set_wfm_attr_callback")
                return -1

        return CSetWfmAttrCallbackPtr(_invoke_callback)

    def _set_waveform_timings(
        self, 
        waveforms: Sequence[AnalogWaveform[numpy.float64] | DigitalWaveform[numpy.uint8]], 
        t0_array: numpy.typing.NDArray[numpy.int64], 
        dt_array: numpy.typing.NDArray[numpy.int64]
    ) -> None:
        for i, waveform in enumerate(waveforms):
            waveform.timing = Timing(
                sample_interval_mode=SampleIntervalMode.REGULAR,
                timestamp=_T0_EPOCH + ht_timedelta(seconds=t0_array[i] * _INT64_WFM_SEC_PER_TICK),
                sample_interval=ht_timedelta(seconds=dt_array[i] * _INT64_WFM_SEC_PER_TICK),
            )

    ## read_digital_waveform has special handling for waveform attributes and callbacks
    def read_digital_waveform(
        self,
        task_handle: object,
        number_of_samples_per_channel: int,
        timeout: float,
        waveform: DigitalWaveform[Any],
        waveform_attribute_mode: WaveformAttributeMode
    ) -> int:
        """Read a digital waveform with timing and attributes."""
        if WaveformAttributeMode.EXTENDED_PROPERTIES in waveform_attribute_mode:
            properties = [waveform.extended_properties]
        else:
            properties = None

        if WaveformAttributeMode.TIMING in waveform_attribute_mode:
            t0_array = numpy.zeros(1, dtype=numpy.int64)
            dt_array = numpy.zeros(1, dtype=numpy.int64)
        else:
            t0_array = None
            dt_array = None

        waveform.sample_count = number_of_samples_per_channel

        error_code, samples_read = self._internal_read_digital_waveform(
            task_handle,
            number_of_samples_per_channel,
            timeout,
            FillMode.GROUP_BY_CHANNEL.value,
            self._get_digital_read_array(waveform),
            properties,
            t0_array,
            dt_array,
            None,
        )

        waveform.sample_count = samples_read
        
        if t0_array is not None and dt_array is not None:
            self._set_waveform_timings([waveform], t0_array, dt_array)

        self.check_for_error(error_code, samps_per_chan_read=samples_read)
        return samples_read

    def _get_digital_read_array(self, waveform: DigitalWaveform[Any]) -> numpy.typing.NDArray[numpy.uint8]:  
        data = waveform.data
        if data.dtype != numpy.uint8:
            data = data.view(numpy.uint8)
        return data

    def read_digital_waveforms(
        self,
        task_handle: object,
        channel_count: int,
        number_of_samples_per_channel: int,
        number_of_signals_per_sample: int,
        timeout: float,
        waveforms: Sequence[DigitalWaveform[Any]],
        waveform_attribute_mode: WaveformAttributeMode,
    ) -> int:
        """Read a digital waveform with timing and attributes."""
        if WaveformAttributeMode.EXTENDED_PROPERTIES in waveform_attribute_mode:
            properties = [waveform.extended_properties for waveform in waveforms]
        else:
            properties = None

        if WaveformAttributeMode.TIMING in waveform_attribute_mode:
            t0_array = numpy.zeros(channel_count, dtype=numpy.int64)
            dt_array = numpy.zeros(channel_count, dtype=numpy.int64)
        else:
            t0_array = None
            dt_array = None

        # Since there's no DAQmxInternalReadDigitalWaveformPerChan, we have to allocate a
        # temporary contiguous array to read the data from multiple channels into.
        read_array = numpy.zeros(
            (number_of_samples_per_channel, channel_count, number_of_signals_per_sample),
            dtype=numpy.uint8)

        bytes_per_chan_array = numpy.zeros(channel_count, dtype=numpy.uint32)

        error_code, samples_read = self._internal_read_digital_waveform(
            task_handle,
            number_of_samples_per_channel,
            timeout,
            FillMode.GROUP_BY_SCAN_NUMBER.value, # GROUP_BY_SCAN_NUMBER handles short reads better than GROUP_BY_CHANNEL
            read_array,
            properties,
            t0_array,
            dt_array,
            bytes_per_chan_array,
        )

        for i, waveform in enumerate(waveforms):        
            waveform.sample_count = samples_read
            waveform_signal_count = waveform.data.shape[1]
            channel_signal_count = bytes_per_chan_array[i]
            if waveform_signal_count != channel_signal_count:
                raise ValueError(f"waveforms[{i}].data has {waveform_signal_count} signals, but expected {channel_signal_count}")
            waveform.data[:] = read_array[:, i, :channel_signal_count]

        if t0_array is not None and dt_array is not None:
            self._set_waveform_timings(waveforms, t0_array, dt_array)

        self.check_for_error(error_code, samps_per_chan_read=samples_read)
        return samples_read

    def read_new_digital_waveforms(
        self,
        task_handle: object,
        channel_count: int,
        number_of_samples_per_channel: int,
        number_of_signals_per_sample: int,
        timeout: float,
        waveform_attribute_mode: WaveformAttributeMode,
    ) -> Sequence[DigitalWaveform[numpy.uint8]]:
        """Read a digital waveform with timing and attributes."""
        if WaveformAttributeMode.EXTENDED_PROPERTIES in waveform_attribute_mode:
            properties = [ExtendedPropertyDictionary() for _ in range(channel_count)]
        else:
            properties = None

        if WaveformAttributeMode.TIMING in waveform_attribute_mode:
            t0_array = numpy.zeros(channel_count, dtype=numpy.int64)
            dt_array = numpy.zeros(channel_count, dtype=numpy.int64)
        else:
            t0_array = None
            dt_array = None

        read_array = numpy.zeros(
            (number_of_samples_per_channel, channel_count, number_of_signals_per_sample),
            dtype=numpy.uint8)

        bytes_per_chan_array = numpy.zeros(channel_count, dtype=numpy.uint32)

        error_code, samples_read = self._internal_read_digital_waveform(
            task_handle,
            number_of_samples_per_channel,
            timeout,
            FillMode.GROUP_BY_SCAN_NUMBER.value, # GROUP_BY_SCAN_NUMBER handles short reads better than GROUP_BY_CHANNEL
            read_array,
            properties,
            t0_array,
            dt_array,
            bytes_per_chan_array,
        )

        waveforms = []
        for i in range(channel_count):
            channel_signal_count = bytes_per_chan_array[i]
            waveform = DigitalWaveform(
                sample_count=samples_read,
                data=read_array[:, i, :channel_signal_count],
                copy_extended_properties=False,
                extended_properties=properties[i] if properties else None)
            waveforms.append(waveform)

        if t0_array is not None and dt_array is not None:
            self._set_waveform_timings(waveforms, t0_array, dt_array)

        self.check_for_error(error_code, samps_per_chan_read=samples_read)
        return waveforms

    def _internal_read_digital_waveform(
        self,
        task_handle: object,
        number_of_samples_per_channel: int,
        timeout: float,
        fill_mode: int,
        read_array: numpy.typing.NDArray[numpy.uint8],
        properties: Sequence[ExtendedPropertyDictionary] | None,
        t0_array: numpy.typing.NDArray[numpy.int64] | None,
        dt_array: numpy.typing.NDArray[numpy.int64] | None,
        bytes_per_chan_array: numpy.typing.NDArray[numpy.uint32] | None = None,
    ) -> tuple[
        int, # error code
        int, # The number of samples per channel that were read
    ]:
        assert isinstance(task_handle, TaskHandle)
        samps_per_chan_read = ctypes.c_int()
        num_bytes_per_samp = ctypes.c_int()

        cfunc = lib_importer.windll.DAQmxInternalReadDigitalWaveform
        if cfunc.argtypes is None:
            with cfunc.arglock:
                if cfunc.argtypes is None:
                    cfunc.argtypes = [
                        TaskHandle,
                        ctypes.c_int,
                        ctypes.c_double,
                        ctypes.c_int,
                        wrapped_ndpointer(dtype=numpy.int64, flags=("C", "W")),
                        wrapped_ndpointer(dtype=numpy.int64, flags=("C", "W")),
                        ctypes.c_uint,
                        CSetWfmAttrCallbackPtr,
                        ctypes.c_void_p,
                        wrapped_ndpointer(dtype=numpy.uint8, flags=("C", "W")),
                        ctypes.c_uint,
                        ctypes.POINTER(ctypes.c_int),
                        ctypes.POINTER(ctypes.c_int),
                        wrapped_ndpointer(dtype=numpy.uint32, flags=("C", "W")),
                        ctypes.c_uint,
                        ctypes.POINTER(c_bool32),
                    ]

        error_code = cfunc(
            task_handle,
            number_of_samples_per_channel,
            timeout,
            fill_mode,
            t0_array,
            dt_array,
            0 if t0_array is None else t0_array.size,
            self._get_wfm_attr_callback(properties),
            None,
            read_array,
            read_array.size,
            ctypes.byref(samps_per_chan_read),
            ctypes.byref(num_bytes_per_samp),
            bytes_per_chan_array,
            0 if bytes_per_chan_array is None else bytes_per_chan_array.size,
            None,
        )

        return error_code, samps_per_chan_read.value

    ## DAQmxReadIDPinMemory returns the size if given a null pointer.
    ## So, we read 1st time to get the size, then read 2nd time to get the data.
    def read_id_pin_memory(self, device_name, id_pin_name):
        data_length_read = ctypes.c_uint()
        format_code = ctypes.c_uint()

        cfunc = lib_importer.windll.DAQmxReadIDPinMemory
        if cfunc.argtypes is None:
            with cfunc.arglock:
                if cfunc.argtypes is None:
                    cfunc.argtypes = [
                        ctypes_byte_str, ctypes_byte_str,
                        wrapped_ndpointer(dtype=numpy.uint8, flags=('C','W')),
                        ctypes.c_uint, ctypes.POINTER(ctypes.c_uint),
                        ctypes.POINTER(ctypes.c_uint)]

        array_size = cfunc(
            device_name, id_pin_name, None, 0,
            ctypes.byref(data_length_read), ctypes.byref(format_code))

        if array_size < 0:
            self.check_for_error(array_size)

        data = numpy.zeros(array_size, dtype=numpy.uint8)

        error_code = cfunc(
            device_name, id_pin_name, data, array_size,
            ctypes.byref(data_length_read), ctypes.byref(format_code))
        self.check_for_error(error_code)
        return data.tolist(), data_length_read.value, format_code.value

    ## The metadata for 'read_power_binary_i16' function is not available in daqmxAPISharp.json file.
    def read_power_binary_i16(
            self, task, num_samps_per_chan, timeout, fill_mode,
            read_voltage_array, read_current_array):
        samps_per_chan_read = ctypes.c_int()

        cfunc = lib_importer.windll.DAQmxReadPowerBinaryI16
        if cfunc.argtypes is None:
            with cfunc.arglock:
                if cfunc.argtypes is None:
                    cfunc.argtypes = [
                        lib_importer.task_handle, ctypes.c_int, ctypes.c_double,
                        c_bool32,
                        wrapped_ndpointer(dtype=numpy.int16, flags=('C', 'W')),
                        wrapped_ndpointer(dtype=numpy.int16, flags=('C', 'W')),
                        ctypes.c_uint, ctypes.POINTER(ctypes.c_int),
                        ctypes.POINTER(c_bool32)]

        error_code = cfunc(
            task, num_samps_per_chan, timeout, fill_mode,
            read_voltage_array, read_current_array, read_voltage_array.size,
            ctypes.byref(samps_per_chan_read), None)
        self.check_for_error(error_code, samps_per_chan_read=samps_per_chan_read.value)

        return read_voltage_array, read_current_array, samps_per_chan_read.value

    ## The metadata for 'read_power_f64' function is not available in daqmxAPISharp.json file.
    def read_power_f64(
            self, task, num_samps_per_chan, timeout, fill_mode,
            read_voltage_array, read_current_array):
        samps_per_chan_read = ctypes.c_int()

        cfunc = lib_importer.windll.DAQmxReadPowerF64
        if cfunc.argtypes is None:
            with cfunc.arglock:
                if cfunc.argtypes is None:
                    cfunc.argtypes = [
                        lib_importer.task_handle, ctypes.c_int, ctypes.c_double,
                        c_bool32,
                        wrapped_ndpointer(dtype=numpy.float64, flags=('C', 'W')),
                        wrapped_ndpointer(dtype=numpy.float64, flags=('C', 'W')),
                        ctypes.c_uint, ctypes.POINTER(ctypes.c_int),
                        ctypes.POINTER(c_bool32)]

        error_code = cfunc(
            task, num_samps_per_chan, timeout, fill_mode,
            read_voltage_array, read_current_array, read_voltage_array.size,
            ctypes.byref(samps_per_chan_read), None)
        self.check_for_error(error_code, samps_per_chan_read=samps_per_chan_read.value)

        return read_voltage_array, read_current_array, samps_per_chan_read.value

    ## The datatype of 'read_array' is incorrect in daqmxAPISharp.json file.
    def read_raw(self, task, num_samps_per_chan, timeout, read_array):
        samples_read = ctypes.c_int()
        number_of_bytes_per_sample = ctypes.c_int()

        cfunc = lib_importer.windll.DAQmxReadRaw
        if cfunc.argtypes is None:
            with cfunc.arglock:
                if cfunc.argtypes is None:
                    cfunc.argtypes = [
                        lib_importer.task_handle, ctypes.c_int, ctypes.c_double,
                        wrapped_ndpointer(dtype=read_array.dtype, flags=('C', 'W')),
                        ctypes.c_uint, ctypes.POINTER(ctypes.c_int),
                        ctypes.POINTER(ctypes.c_int), ctypes.POINTER(c_bool32)]

        error_code = cfunc(
            task, num_samps_per_chan, timeout, read_array,
            read_array.nbytes, ctypes.byref(samples_read),
            ctypes.byref(number_of_bytes_per_sample), None)
        self.check_for_error(error_code, samps_per_chan_read=samples_read.value)

        return read_array, samples_read.value, number_of_bytes_per_sample.value

    ## write_analog_waveform has special handling
    def write_analog_waveform(
        self,
        task_handle: object,
        waveform: AnalogWaveform[Any],
        auto_start: bool,
        timeout: float
    ) -> int:
        """Write an analog waveform."""
        return self.write_analog_f64(
            task_handle,
            waveform.sample_count,
            auto_start,
            timeout,
            FillMode.GROUP_BY_CHANNEL.value,
            self._get_analog_write_array(waveform),
        )

    ## write_analog_waveforms has special handling
    def write_analog_waveforms(
        self,
        task_handle: object,
        waveforms: Sequence[AnalogWaveform[Any]],
        auto_start: bool,
        timeout: float
    ) -> int:
        """Write analog waveforms."""
        num_samps_per_chan = get_num_samps_per_chan(waveforms)

        write_arrays = [self._get_analog_write_array(waveform) for waveform in waveforms]

        error_code, samples_written = self._internal_write_analog_waveform_per_chan(
            task_handle,
            num_samps_per_chan,
            auto_start,
            timeout,
            write_arrays,
        )

        self.check_for_error(error_code, samps_per_chan_written=samples_written)
        return samples_written

    def _get_analog_write_array(self, waveform: AnalogWaveform[Any]) -> numpy.typing.NDArray[numpy.float64]:  
        scaled_data = waveform.scaled_data
        if scaled_data.flags.c_contiguous:
            return scaled_data
        return scaled_data.copy(order="C")

    def _internal_write_analog_waveform_per_chan(
        self,
        task_handle: object,
        num_samps_per_chan: int,
        auto_start: bool,
        timeout: float,
        write_arrays: Sequence[numpy.typing.NDArray[numpy.float64]],
    ) -> tuple[
        int, # error code
        int, # The number of samples per channel that were written
    ]:
        assert isinstance(task_handle, TaskHandle)
        samps_per_chan_written = ctypes.c_int()

        channel_count = len(write_arrays)
        assert channel_count > 0
        assert all(write_array.size >= num_samps_per_chan for write_array in write_arrays)

        cfunc = lib_importer.windll.DAQmxInternalWriteAnalogWaveformPerChan
        if cfunc.argtypes is None:
            with cfunc.arglock:
                if cfunc.argtypes is None:
                    cfunc.argtypes = [
                        TaskHandle,
                        ctypes.c_int,
                        c_bool32,
                        ctypes.c_double,
                        ctypes.POINTER(ctypes.POINTER(ctypes.c_double)),
                        ctypes.c_uint,
                        ctypes.POINTER(ctypes.c_int),
                        ctypes.POINTER(c_bool32),
                    ]

        write_array_ptrs = (ctypes.POINTER(ctypes.c_double) * channel_count)()
        for i, write_array in enumerate(write_arrays):
            write_array_ptrs[i] = write_array.ctypes.data_as(ctypes.POINTER(ctypes.c_double))

        error_code = cfunc(
            task_handle,
            num_samps_per_chan,
            auto_start,
            timeout,
            write_array_ptrs,
            channel_count,
            ctypes.byref(samps_per_chan_written),
            None,
        )

        return error_code, samps_per_chan_written.value

    def write_digital_waveform(
        self,
        task_handle: object,
        waveform: DigitalWaveform[Any],
        auto_start: bool,
        timeout: float,
    ) -> int:
        """Write a digital waveform."""
        bytes_per_chan_array = numpy.array([waveform.signal_count], dtype=numpy.uint32)

        error_code, samples_written = self._internal_write_digital_waveform(
            task_handle,
            waveform.sample_count,
            auto_start,
            timeout,
            FillMode.GROUP_BY_CHANNEL.value,
            self._get_digital_write_array(waveform),
            bytes_per_chan_array,
        )

        self.check_for_error(error_code, samps_per_chan_written=samples_written)
        return samples_written

    def _get_digital_write_array(self, waveform: DigitalWaveform[Any]) -> numpy.typing.NDArray[numpy.uint8]:  
        data = waveform.data
        if data.dtype != numpy.uint8:
            data = data.view(numpy.uint8)
        if data.flags.c_contiguous:
            return data
        return data.copy(order="C")

    def write_digital_waveforms(
        self,
        task_handle: object,
        waveforms: Sequence[DigitalWaveform[Any]],
        auto_start: bool,
        timeout: float,
    ) -> int:
        """Write digital waveforms."""
        channel_count = len(waveforms)
        sample_count = get_num_samps_per_chan(waveforms)
                
        bytes_per_chan_array = numpy.array([wf.signal_count for wf in waveforms], dtype=numpy.uint32)

        # build a temporary contiguous array to write the data from multiple channels into.
        # write_array must be in the format (numChans x numSampsPerChan x maxDataWidth)
        write_array = numpy.zeros(
            (channel_count, sample_count, max(bytes_per_chan_array)),
            dtype=numpy.uint8,
        )
        for i, waveform in enumerate(waveforms):
            signal_count = waveform.signal_count
            write_array[i, :, :signal_count] = waveform.data

        error_code, samples_written = self._internal_write_digital_waveform(
            task_handle,
            sample_count,
            auto_start,
            timeout,
            FillMode.GROUP_BY_CHANNEL.value,
            write_array,
            bytes_per_chan_array,
        )

        self.check_for_error(error_code, samps_per_chan_written=samples_written)
        return samples_written

    def _internal_write_digital_waveform(
        self,
        task_handle: object,
        num_samps_per_chan: int,
        auto_start: bool,
        timeout: float,
        data_layout: int,
        write_array: numpy.typing.NDArray[numpy.uint8],
        bytes_per_chan_array: numpy.typing.NDArray[numpy.uint32] | None = None,
    ) -> tuple[
        int, # error code
        int, # The number of samples per channel that were written
    ]:
        assert isinstance(task_handle, TaskHandle)
        samps_per_chan_written = ctypes.c_int()

        cfunc = lib_importer.windll.DAQmxInternalWriteDigitalWaveform
        if cfunc.argtypes is None:
            with cfunc.arglock:
                if cfunc.argtypes is None:
                    cfunc.argtypes = [
                        TaskHandle,
                        ctypes.c_int,
                        c_bool32,
                        ctypes.c_double,
                        c_bool32,
                        wrapped_ndpointer(dtype=numpy.uint8, flags=("C",)),
                        wrapped_ndpointer(dtype=numpy.uint32, flags=("C",)),
                        ctypes.c_uint,
                        ctypes.POINTER(ctypes.c_int),
                        ctypes.POINTER(c_bool32),
                    ]

        error_code = cfunc(
            task_handle,
            num_samps_per_chan,
            auto_start,
            timeout,
            data_layout,
            write_array,
            bytes_per_chan_array,
            0 if bytes_per_chan_array is None else bytes_per_chan_array.size,
            ctypes.byref(samps_per_chan_written),
            None,
        )

        return error_code, samps_per_chan_written.value

    ## The datatype of 'write_array' is incorrect in daqmxAPISharp.json file.
    def write_raw(
            self, task_handle, num_samps_per_chan, auto_start, timeout, numpy_array):
        samps_per_chan_written = ctypes.c_int()

        cfunc = lib_importer.windll.DAQmxWriteRaw
        if cfunc.argtypes is None:
            with cfunc.arglock:
                if cfunc.argtypes is None:
                    cfunc.argtypes = [
                        lib_importer.task_handle, ctypes.c_int, c_bool32,
                        ctypes.c_double,
                        wrapped_ndpointer(dtype=numpy_array.dtype,
                                        flags=('C')),
                        ctypes.POINTER(ctypes.c_int), ctypes.POINTER(c_bool32)]

        error_code = cfunc(
            task_handle, num_samps_per_chan, auto_start, timeout, numpy_array,
            ctypes.byref(samps_per_chan_written), None)
        self.check_for_error(error_code, samps_per_chan_written=samps_per_chan_written.value)

        return samps_per_chan_written.value

    def hash_task_handle(self, task_handle):
        return hash(task_handle.value)

    def check_for_error(self, error_code, samps_per_chan_written=None, samps_per_chan_read=None):
        if not error_code:
            return

        if error_code < 0:
            extended_error_info = self.get_extended_error_info()

            if samps_per_chan_read is not None:
                raise DaqReadError(extended_error_info, error_code, samps_per_chan_read)
            elif samps_per_chan_written is not None:
                raise DaqWriteError(extended_error_info, error_code, samps_per_chan_written)
            else:
                raise DaqError(extended_error_info, error_code)

        elif error_code > 0:
            error_string = self.get_error_string(error_code)

            warnings.warn(DaqWarning(error_string, error_code))


def is_string_buffer_too_small(error_code):
    return (
        error_code == DAQmxErrors.BUFFER_TOO_SMALL_FOR_STRING or
        error_code == DAQmxWarnings.CAPI_STRING_TRUNCATED_TO_FIT_BUFFER)


def is_array_buffer_too_small(error_code):
    return error_code == DAQmxErrors.WRITE_BUFFER_TOO_SMALL
